[spark] support lateral inner join for vector search#8252
Conversation
8aa9c09 to
774c9b6
Compare
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| child.execute().mapPartitions { | ||
| outerRows => |
There was a problem hiding this comment.
Can batch queries be supported? Batch queries are crucial for performance. You can take a look to benchmark in https://github.com/apache/paimon-vector-index
There was a problem hiding this comment.
Thanks for your reminder. I'll refine it in batch mode later.
|
Please fix test failures. |
4697f65 to
c23c76b
Compare
a1e3745 to
835b339
Compare
| _.toPaimonDataField)).asJava) | ||
| } | ||
| val sparkRow = SparkInternalRow.create(resultRowType) | ||
| val vectorSearchBuilder = innerTable |
There was a problem hiding this comment.
Normal Spark vector_search scans apply pushed partition/data filters before top-k (PaimonBaseScan.evalVectorSearch passes pushedPartitionFilters/pushedDataFilters into the builder). This lateral executor builds the BatchVectorSearchBuilder here without carrying predicates from the search side; PushDownLateralVectorSearchFilter only pushes predicates that reference the left child, so predicates on r.dt or other searched-table columns stay above LateralVectorSearch. A query like ... JOIN LATERAL (...) r WHERE r.dt = '20260420' will pick topK over all partitions and then filter the joined rows, which can return fewer or wrong rows compared with non-lateral vector_search(...) WHERE dt = .... Please preserve search-side filters and apply them via withPartitionFilter/withFilter before newVectorScan()/readBatch(), or reject such predicates explicitly.
|
|
||
| scan.plan().splits().asScala.iterator.flatMap { | ||
| split => | ||
| val reader = |
There was a problem hiding this comment.
This reader is only closed when this inner iterator is fully exhausted. If a downstream operator short-circuits consumption, for example LIMIT 1/take, or if the task is interrupted, Spark can stop pulling rows before hasNext returns false, leaving the current PaimonRecordReaderIterator and its underlying RecordReader open. Please register a TaskContext completion listener or wrap the returned iterator so the current reader is closed on task completion/cancellation as well as normal exhaustion.
835b339 to
07428ac
Compare
07428ac to
c9ee0dd
Compare
Purpose
Purpose: Support lateral join for vector search on spark.
Linked issue: #8251
Tests
Add vector search with lateral join on org.apache.paimon.spark.SparkMultimodalITCase#testVector、org.apache.spark.sql.test.SQLTestUtils#test("lateral vector search preserves subquery alias qualifiers")